/**
 * Tests the cloning portion of a resharding operation in isolation.
 *
 * @tags: [
 *   uses_atclustertime,
 * ]
 */
import {ShardingTest} from "jstests/libs/shardingtest.js";
import {extractUUIDFromObject, getUUIDFromListCollections} from "jstests/libs/uuid_util.js";
import {CreateShardedCollectionUtil} from "jstests/sharding/libs/create_sharded_collection_util.js";

const st = new ShardingTest({
    mongos: 1,
    config: 1,
    shards: 2,
    rs: {nodes: 3},
    rsOptions: {
        setParameter: {
            "failpoint.WTPreserveSnapshotHistoryIndefinitely": tojson({mode: "alwaysOn"}),
            logComponentVerbosity: tojson({sharding: {reshard: 2}}),
        }
    },
});

const inputCollection = st.s.getCollection("reshardingDb.coll");

CreateShardedCollectionUtil.shardCollectionWithChunks(inputCollection, {oldKey: 1}, [
    {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: st.shard0.shardName},
    {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: st.shard1.shardName},
]);

const inputCollectionUUID =
    getUUIDFromListCollections(inputCollection.getDB(), inputCollection.getName());
const inputCollectionUUIDString = extractUUIDFromObject(inputCollectionUUID);

const temporaryReshardingCollection =
    st.s.getCollection(`reshardingDb.system.resharding.${inputCollectionUUIDString}`);

CreateShardedCollectionUtil.shardCollectionWithChunks(temporaryReshardingCollection, {newKey: 1}, [
    {min: {newKey: MinKey}, max: {newKey: 0}, shard: st.shard0.shardName},
    {min: {newKey: 0}, max: {newKey: MaxKey}, shard: st.shard1.shardName},
]);

// The shardCollection command doesn't wait for the config.cache.chunks entries to have been written
// on the primary shard for the database. We manually run the _flushRoutingTableCacheUpdates command
// to guarantee they have been written and are visible with the atClusterTime used by the
// testReshardCloneCollection command.
for (const shard of [st.shard0, st.shard1]) {
    assert.commandWorked(shard.rs.getPrimary().adminCommand(
        {_flushRoutingTableCacheUpdates: temporaryReshardingCollection.getFullName()}));
}

assert.commandWorked(inputCollection.insert([
    {_id: "stays on shard0", oldKey: -10, newKey: -10},
    {_id: "moves to shard0", oldKey: 10, newKey: -10},
    {_id: "moves to shard1", oldKey: -10, newKey: 10},
    {_id: "stays on shard1", oldKey: 10, newKey: 10},
]));

const atClusterTime = inputCollection.getDB().getSession().getOperationTime();

assert.commandWorked(inputCollection.insert([
    {_id: "not visible, but would stay on shard0", oldKey: -10, newKey: -10},
    {_id: "not visible, but would move to shard0", oldKey: 10, newKey: -10},
    {_id: "not visible, but would move to shard1", oldKey: -10, newKey: 10},
    {_id: "not visible, but would stay on shard1", oldKey: 10, newKey: 10},
]));

// We wait for the "not visible" inserts to become majority-committed on all members of the replica
// set shards. This isn't necessary for the test's correctness but makes it more likely that the
// test would fail if ReshardingCollectionCloner wasn't specifying atClusterTime in its read
// concern.
st.shard0.rs.awaitLastOpCommitted();
st.shard1.rs.awaitLastOpCommitted();

function testReshardCloneCollection(shard, expectedDocs) {
    const dbName = inputCollection.getDB().getName();
    const allNodes = [...st.shard0.rs.nodes, ...st.shard1.rs.nodes];

    for (const node of allNodes) {
        node.getDB(dbName).setProfilingLevel(2);
    }

    const reshardCmd = {
        testReshardCloneCollection: inputCollection.getFullName(),
        shardKey: {newKey: 1},
        uuid: inputCollectionUUID,
        shardId: shard.shardName,
        atClusterTime: atClusterTime,
        outputNs: temporaryReshardingCollection.getFullName(),
    };
    jsTestLog({"Node": shard.rs.getPrimary(), "ReshardingCmd": reshardCmd});
    assert.commandWorked(shard.rs.getPrimary().adminCommand(reshardCmd));

    for (const node of allNodes) {
        node.getDB(dbName).setProfilingLevel(0);
    }

    // We sort by oldKey so the order of `expectedDocs` can be deterministic.
    assert.eq(expectedDocs,
              shard.rs.getPrimary()
                  .getCollection(temporaryReshardingCollection.getFullName())
                  .find()
                  .sort({oldKey: 1})
                  .toArray());

    // Verify the ReshardingCollectionCloner is sending its aggregation requests with a logical
    // session ID to prevent idle cursors from being timed out by the CursorManager.
    for (const donorShard of [st.shard0, st.shard1]) {
        const profilerEntries = donorShard.rs.nodes
                                    .map(node => node.getDB(dbName).system.profile.findOne({
                                        op: "command",
                                        ns: inputCollection.getFullName(),
                                        "command.aggregate": inputCollection.getName(),
                                        "command.collectionUUID": inputCollectionUUID,
                                    }))
                                    .filter(entry => entry !== null);

        assert.neq([],
                   profilerEntries,
                   `expected to find collection cloning aggregation in profiler output of some ${
                       donorShard.shardName} node`);

        for (const entry of profilerEntries) {
            assert(entry.command.hasOwnProperty("lsid"),
                   "expected profiler entry for collection cloning aggregation to have a logical" +
                       ` session ID: ${tojson(entry)}`);
        }
    }
}

testReshardCloneCollection(st.shard0, [
    {_id: "stays on shard0", oldKey: -10, newKey: -10},
    {_id: "moves to shard0", oldKey: 10, newKey: -10},
]);

testReshardCloneCollection(st.shard1, [
    {_id: "moves to shard1", oldKey: -10, newKey: 10},
    {_id: "stays on shard1", oldKey: 10, newKey: 10},
]);

// The temporary reshard collection must be dropped before checking metadata integrity.
assert(temporaryReshardingCollection.drop());

st.stop();
